{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Data Generators\n",
    "\n",
    "author: Jacob Schreiber <br>\n",
    "contact: jmschreiber91@gmail.com\n",
    "\n",
    "The most common way to pass data to machine learning models, like those defined using scikit-learn or keras, is in the format of a numpy array. However, there are times when this is either not possible, like in the case where the data set is so big that it cannot fit in memory, or simply inconvenient, like in the case where the data exists in some other format. Further, once the numpy array is passed in, control over the training process becomes limited to whatever built-in commands the package happens to support.\n",
    "\n",
    "The concept of a data generator is a solution to both of these problems. Essentially, a data generator wraps a data set and yields batches of data. The pomegranate models then simply request batches from the generators, rather than handling the work of partitioning the data set themselves. Thus, generators become a flexible solution to a variety of problems because the user can define how exactly these batches are generated, including how data is read in, what pre-processing steps are performed on it, and when the epoch ends.\n",
    "\n",
    "The internal fitting loop of most pomegranate models looks something like this:\n",
    "\n",
    "```python\n",
    "with Parallel(n_jobs=n_jobs, backend='threading') as parallel:\n",
    "    f = delayed(self.summarize, check_pickle=False)\n",
    "\n",
    "    while improvement > stop_threshold and iteration < max_iterations + 1:\n",
    "        # Update parameters from the stored sufficient statistics\n",
    "        self.from_summaries(step_size, pseudocount)\n",
    "\n",
    "        # Calculate new sufficient statistics from the data\n",
    "        log_probability_sum = sum(parallel(f(*batch) for batch in \n",
    "            data_generator.batches()))\n",
    "```\n",
    "\n",
    "The code is fairly simple. First, the model creates a thread pool for parallel processing (defaulting back to sequential execution if `n_jobs=1`). Then, there is a loop that continues until either convergence or the maximum number of iterations is reached. Inside that loop, there is a parameter update step and a batch summarization step. This may seem out of order but is written this way for efficiency reasons we won't get into here. However, the important aspect is that the data generator has a method, `batches`, that generates batches for each epoch. These batches are then processed, either sequentially or in parallel, by the `summarize` method. Importantly, it is the data generator that returns these batches rather than some hard-coded process."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Sun Dec 01 2019 \n",
      "\n",
      "numpy 1.17.2\n",
      "scipy 1.3.1\n",
      "pomegranate| not installed\n",
      "\n",
      "compiler   : GCC 7.3.0\n",
      "system     : Linux\n",
      "release    : 4.15.0-66-generic\n",
      "machine    : x86_64\n",
      "processor  : x86_64\n",
      "CPU cores  : 8\n",
      "interpreter: 64bit\n"
     ]
    }
   ],
   "source": [
    "%matplotlib inline\n",
    "import numpy\n",
    "import pandas\n",
    "import matplotlib.pyplot as plt\n",
    "import seaborn; seaborn.set_style('whitegrid')\n",
    "\n",
    "from pomegranate import *\n",
    "\n",
    "numpy.random.seed(0)\n",
    "numpy.set_printoptions(suppress=True)\n",
    "\n",
    "%load_ext watermark\n",
    "%watermark -m -n -p numpy,scipy,pomegranate|"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Using a pre-defined generator\n",
    "\n",
    "Currently, there are three generators that are built-in to pomegranate: `DataGenerator`, `SequenceGenerator`, and `DataFrameGenerator`. `DataGenerator` is a somewhat bland generator that takes in a data set and yields sequential chunks of data until the entire data set has been seen. SequenceGenerator takes a list of sequences and yields them one at a time, for use mostly for `HiddenMarkovModel` objects. Finally, `DataFrameGenerator` takes a pandas DataFrame and yields batches from it.\n",
    "\n",
    "Let's take a look at learning a Bayesian network using a pandas DataFrame."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>bool</th>\n",
       "      <th>str</th>\n",
       "      <th>int1</th>\n",
       "      <th>int2</th>\n",
       "      <th>color</th>\n",
       "      <th>int3</th>\n",
       "      <th>int4</th>\n",
       "      <th>int5</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>True</td>\n",
       "      <td>B</td>\n",
       "      <td>0</td>\n",
       "      <td>1</td>\n",
       "      <td>red</td>\n",
       "      <td>0</td>\n",
       "      <td>1</td>\n",
       "      <td>1</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>False</td>\n",
       "      <td>A</td>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "      <td>green</td>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "      <td>1</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>False</td>\n",
       "      <td>A</td>\n",
       "      <td>1</td>\n",
       "      <td>3</td>\n",
       "      <td>orange</td>\n",
       "      <td>1</td>\n",
       "      <td>1</td>\n",
       "      <td>2</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>True</td>\n",
       "      <td>B</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>blue</td>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>False</td>\n",
       "      <td>A</td>\n",
       "      <td>0</td>\n",
       "      <td>3</td>\n",
       "      <td>green</td>\n",
       "      <td>1</td>\n",
       "      <td>1</td>\n",
       "      <td>0</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "    bool str int1 int2   color int3 int4 int5\n",
       "0   True   B    0    1     red    0    1    1\n",
       "1  False   A    1    0   green    1    0    1\n",
       "2  False   A    1    3  orange    1    1    2\n",
       "3   True   B    0    0    blue    1    0    0\n",
       "4  False   A    0    3   green    1    1    0"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "numpy.random.seed(0)\n",
    "\n",
    "X = numpy.array([\n",
    "    numpy.random.choice([True, False], size=10),\n",
    "    numpy.random.choice(['A', 'B'], size=10),\n",
    "    numpy.random.choice(2, size=10),\n",
    "    numpy.random.choice(4, size=10),\n",
    "    numpy.random.choice(['orange', 'blue', 'red', 'green'], size=10),\n",
    "    numpy.random.choice(2, size=10),\n",
    "    numpy.random.choice(2, size=10),\n",
    "    numpy.random.choice(3, size=10)\n",
    "], dtype=object).T.copy()\n",
    "\n",
    "X_df = pandas.DataFrame(X, columns=['bool', 'str', 'int1', 'int2', 'color', 'int3', 'int4', 'int5'])\n",
    "X_df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "((), (0,), (1,), (5,), (), (), (), (5,))"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model1 = BayesianNetwork.from_samples(X_df, state_names=X_df.columns)\n",
    "model1.structure"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "((), (0,), (1,), (5,), (), (), (), (5,))"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model2 = BayesianNetwork.from_samples(X, state_names=X_df.columns)\n",
    "model2.structure"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can see that we're getting the same structure regardless of if we use a numpy array or a DataFrame. This is because, internally, each object that is not a data generator gets wrapped by `DataGenerator` before being used internally."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([{\n",
       "    \"class\" :\"Distribution\",\n",
       "    \"dtype\" :\"bool\",\n",
       "    \"name\" :\"DiscreteDistribution\",\n",
       "    \"parameters\" :[\n",
       "        {\n",
       "            \"True\" :0.6666666666666666,\n",
       "            \"False\" :0.3333333333333333\n",
       "        }\n",
       "    ],\n",
       "    \"frozen\" :false\n",
       "},\n",
       "       'B',\n",
       "       {\n",
       "    \"class\" :\"Distribution\",\n",
       "    \"dtype\" :\"int\",\n",
       "    \"name\" :\"DiscreteDistribution\",\n",
       "    \"parameters\" :[\n",
       "        {\n",
       "            \"0\" :1.0,\n",
       "            \"1\" :0.0\n",
       "        }\n",
       "    ],\n",
       "    \"frozen\" :false\n",
       "},\n",
       "       {\n",
       "    \"class\" :\"Distribution\",\n",
       "    \"dtype\" :\"int\",\n",
       "    \"name\" :\"DiscreteDistribution\",\n",
       "    \"parameters\" :[\n",
       "        {\n",
       "            \"0\" :0.29999999999999993,\n",
       "            \"1\" :0.2000000000000001,\n",
       "            \"2\" :0.1000000000000001,\n",
       "            \"3\" :0.3999999999999999\n",
       "        }\n",
       "    ],\n",
       "    \"frozen\" :false\n",
       "},\n",
       "       'blue',\n",
       "       {\n",
       "    \"class\" :\"Distribution\",\n",
       "    \"dtype\" :\"int\",\n",
       "    \"name\" :\"DiscreteDistribution\",\n",
       "    \"parameters\" :[\n",
       "        {\n",
       "            \"0\" :0.4000000000000002,\n",
       "            \"1\" :0.5999999999999999\n",
       "        }\n",
       "    ],\n",
       "    \"frozen\" :false\n",
       "},\n",
       "       {\n",
       "    \"class\" :\"Distribution\",\n",
       "    \"dtype\" :\"int\",\n",
       "    \"name\" :\"DiscreteDistribution\",\n",
       "    \"parameters\" :[\n",
       "        {\n",
       "            \"1\" :0.30000000000000004,\n",
       "            \"0\" :0.6999999999999998\n",
       "        }\n",
       "    ],\n",
       "    \"frozen\" :false\n",
       "},\n",
       "       {\n",
       "    \"class\" :\"Distribution\",\n",
       "    \"dtype\" :\"int\",\n",
       "    \"name\" :\"DiscreteDistribution\",\n",
       "    \"parameters\" :[\n",
       "        {\n",
       "            \"0\" :0.29999999999999993,\n",
       "            \"1\" :0.4,\n",
       "            \"2\" :0.3\n",
       "        }\n",
       "    ],\n",
       "    \"frozen\" :false\n",
       "}], dtype=object)"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model1.predict_proba({'color': 'blue', 'str': 'B'})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It is important to note that Bayesian network structure learning does not operate in a batch-wise fashion. Rather, the data set is entirely generated by exhausting the generator in the `batches` method. In this case, the generator serves primarily as a way to convert a pandas DataFrame to a numpy array for use by the model."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### How do you define a data generator?\n",
    "\n",
    "The `BaseGenerator` looks basically like the following:\n",
    "\n",
    "```python\n",
    "class BaseGenerator(object):\n",
    "\tdef __init__(self):\n",
    "\t\tpass\n",
    "\n",
    "\tdef __len__(self):\n",
    "\t\treturn NotImplementedError\n",
    "\n",
    "\t@property\n",
    "\tdef shape(self):\n",
    "\t\treturn NotImplementedError\n",
    "    \n",
    "    @property\n",
    "    def ndim(self):\n",
    "        return NotImplementedError\n",
    "    \n",
    "    def batches(self):\n",
    "        return NotImplementedError\n",
    "    \n",
    "    def unlabeled_batches(self):\n",
    "        return NotImplementedError\n",
    "    \n",
    "    def labeled_batches(self):\n",
    "        return NotImplementedError\n",
    "```\n",
    "\n",
    "Your custom data generator has to implement the subset of methods that will be relevant for your task. \n",
    "\n",
    "The first few methods are related to managing the generator. The `__init__` method should take in the parameters involved with loading the data set and how batches are to be generated, e.g. batch size, shuffling, etc.. The `__len__` method should define the total number of examples in the data set, even when they aren't all read into disk. The `shape` and `ndim` methods are the counterparts to the numpy methods. While `shape` and `__len__` are somewhat redundant with each other they are both necessary because some operations within pomegranate need to work regardless if a list, a numpy array, or a data generator have been passed in.\n",
    "\n",
    "The remaining methods involve generating batches of data to operate on. The `batches` method should be a generator that yields batches until the epoch is over. This can be as simple as contiguous chunks of data until the entire data set has been seen (as in `DataGenerator`) or as complicated as randomly sampling examples until some pre-specified number of examples have been seen. Because each iteration of the training loop involves another call to `batches` the generator becomes reset each epoch. The `unlabeled_batches` and `labeled_batches` are used in the case of semi-supervised learning and, respectively, generate only examples that do are unlabeled from the data set and those that are labeled. These do not have to be implemented if the use case will never rely on semi-supervised learning.\n",
    "\n",
    "### Building a custom out-of-core data generator\n",
    "\n",
    "As an example, let's build an out-of-core data generator that wraps a numpy array that lives on disk. The purpose of this generator would be to be able to generate batches from a data set without having to read the entire thing into memory. Because we are writing a data generator to do this, we don't have to rely on pomegranate internally being able to handle out-of-core operations; all we have to do is define how one would generate batches from it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pomegranate.io import BaseGenerator\n",
    "\n",
    "class MemoryMapGenerator(BaseGenerator):\n",
    "    def __init__(self, filename, weights=None, batch_size=None):\n",
    "        self.X = numpy.load(filename, mmap_mode='r')\n",
    "        \n",
    "        if weights is None:\n",
    "            self.weights = numpy.ones(self.X.shape[0])\n",
    "        else:\n",
    "            self.weights = numpy.load(weights, mmap_mode='r')\n",
    "        \n",
    "        if batch_size is None:\n",
    "            self.batch_size = self.X.shape[0]\n",
    "        else:\n",
    "            self.batch_size = batch_size\n",
    "            \n",
    "    def __len__(self):\n",
    "        return len(self.X)\n",
    "\n",
    "    @property\n",
    "    def shape(self):\n",
    "        return self.X.shape\n",
    "\n",
    "    @property\n",
    "    def ndim(self):\n",
    "        return self.X.ndim\n",
    "\n",
    "    def batches(self):\n",
    "        start, end = 0, self.batch_size\n",
    "        iteration = 0\n",
    "\n",
    "        while start < len(self):\n",
    "            yield self.X[start:end], self.weights[start:end]\n",
    "\n",
    "            start += self.batch_size\n",
    "            end += self.batch_size\n",
    "            iteration += 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The implementation here was fairly straightforward, mostly because numpy is already super convenient when it comes to handling out-of-core data sets that are stored as numpy formatted arrays. All the generator does is load up slices of data (and corresponding slices of weights) and yields them until all examples have been seen."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "X = numpy.random.randn(10000, 70)\n",
    "numpy.save(\"X_test.npy\", X)\n",
    "\n",
    "X_generator = MemoryMapGenerator(\"X_test.npy\", batch_size=1000)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's train a model using the original data set and by using a generator defined on the data set that now lives on disk"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[1] Improvement: 22728.663993260125\tTime (s): 0.01303\n",
      "[2] Improvement: 351.98156800703146\tTime (s): 0.01358\n",
      "[3] Improvement: 196.71634020493366\tTime (s): 0.01322\n",
      "[4] Improvement: 136.9181309074629\tTime (s): 0.0132\n",
      "[5] Improvement: 98.06255334301386\tTime (s): 0.01312\n",
      "[6] Improvement: 73.51235836837441\tTime (s): 0.01331\n",
      "[7] Improvement: 57.17729074554518\tTime (s): 0.01358\n",
      "[8] Improvement: 46.950419555883855\tTime (s): 0.01334\n",
      "[9] Improvement: 40.84007772023324\tTime (s): 0.01323\n",
      "[10] Improvement: 36.80416104476899\tTime (s): 0.01291\n",
      "Total Improvement: 23767.626893157372\n",
      "Total Time (s): 0.1473\n"
     ]
    }
   ],
   "source": [
    "from pomegranate import GeneralMixtureModel, MultivariateGaussianDistribution\n",
    "\n",
    "d = MultivariateGaussianDistribution\n",
    "model1 = GeneralMixtureModel.from_samples(d, 2, X, verbose=True, batch_size=1000, max_iterations=10, init='first-k')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[1] Improvement: 22728.663993260125\tTime (s): 0.01284\n",
      "[2] Improvement: 351.98156800703146\tTime (s): 0.01327\n",
      "[3] Improvement: 196.71634020493366\tTime (s): 0.013\n",
      "[4] Improvement: 136.9181309074629\tTime (s): 0.01306\n",
      "[5] Improvement: 98.06255334301386\tTime (s): 0.01332\n",
      "[6] Improvement: 73.51235836837441\tTime (s): 0.01341\n",
      "[7] Improvement: 57.17729074554518\tTime (s): 0.01342\n",
      "[8] Improvement: 46.950419555883855\tTime (s): 0.01333\n",
      "[9] Improvement: 40.84007772023324\tTime (s): 0.01305\n",
      "[10] Improvement: 36.80416104476899\tTime (s): 0.01355\n",
      "Total Improvement: 23767.626893157372\n",
      "Total Time (s): 0.1456\n"
     ]
    }
   ],
   "source": [
    "model2 = GeneralMixtureModel.from_samples(d, 2, X_generator, verbose=True, max_iterations=10, init='first-k')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It looks like we're getting the same improvements and that it's not actually significantly slower to load up batches in an out-of-core manner than to generate them from a data set that lives in memory."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
